package com.my.service.task.sink;

import com.my.service.task.entity.ChaomanAndWomenInfo;
import com.my.service.task.util.MongoUtils;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.bson.Document;


public class ChaoManAndWomenSink implements SinkFunction<ChaomanAndWomenInfo> {
    @Override
    public void invoke(ChaomanAndWomenInfo value, Context context) throws Exception {
        String chaotype = value.getChaotype();
        long count = value.getCount();
        Document doc = MongoUtils.findOneBy("chaoManAndWomenstatics", "realtimeportrait", chaotype);
        if (doc == null) {
            doc = new Document();
            doc.put("info", chaotype);
            doc.put("count", count);
        } else {
            Long countpre = doc.getLong("count");
            Long total = countpre + count;
            doc.put("count", total);
        }
        MongoUtils.saveOrUpdateMongo("chaoManAndWomenstatics", "realtimeportrait", doc);
    }
}
